package com.kool.kmqtt.server;

import com.kool.kmqtt.server.constant.PacketTypeEnum;
import com.kool.kmqtt.server.exception.AppException;
import com.kool.kmqtt.server.exception.ErrorCode;
import com.kool.kmqtt.server.packet.FixedHeader;
import com.kool.kmqtt.server.packet.Packet;
import com.kool.kmqtt.server.parser.*;
import com.kool.kmqtt.server.processer.*;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PacketReceiver {

    /**
     * 根据收到报文的固定报头分发给不同的报文处理器，为报文处理器初始化报文解析器
     *
     * @param ctx
     * @param in
     */
    public Packet receive(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //解析固定报头
        FixedHeader fixedHeader = PacketParser.parseFixedHeader(in);
        //创建报文处理器实例
        PacketProcessor packetProcessor = getPacketProcesser(ctx, fixedHeader);
        //执行报文处理器
       return packetProcessor.process(in);
    }

    /**
     * 创建报文处理器实例
     *
     * @param ctx
     * @param fixedHeader
     * @return
     */
    private PacketProcessor getPacketProcesser(ChannelHandlerContext ctx, FixedHeader fixedHeader) {
        int packetType = fixedHeader.getPacketType();
        if (PacketTypeEnum.CONNECT.getCode() == packetType) {
            return new ConnectPacketProcessor(ctx, new ConnectPacketParser(fixedHeader));
        } else if (PacketTypeEnum.PUBLISH.getCode() == packetType) {
            return new PublishPacketProcessor(ctx, new PublishPacketParser(fixedHeader));
        } else if (PacketTypeEnum.PUBACK.getCode() == packetType) {
            return new PubackPacketProcessor(ctx, new PubackPacketParser(fixedHeader));
        } else if (PacketTypeEnum.PUBREC.getCode() == packetType) {
            return new PubrecPacketProcessor(ctx, new PubrecPacketParser(fixedHeader));
        } else if (PacketTypeEnum.PUBREL.getCode() == packetType) {
            return new PubrelPacketProcessor(ctx, new PubrelPacketParser(fixedHeader));
        } else if (PacketTypeEnum.PUBCOMP.getCode() == packetType) {
            return new PubcompPacketProcessor(ctx, new PubcompPacketParser(fixedHeader));
        } else if (PacketTypeEnum.SUBSCRIBE.getCode() == packetType) {
            return new SubscribePacketProcessor(ctx, new SubscribePacketParser(fixedHeader));
        } else if (PacketTypeEnum.UNSUBSCRIBE.getCode() == packetType) {
            return new UnsubscribePacketProcessor(ctx, new UnsubscribePacketParser(fixedHeader));
        } else if (PacketTypeEnum.PINGREQ.getCode() == packetType) {
            return new PingreqPacketProcessor(ctx, new PingreqPacketParser(fixedHeader));
        } else if (PacketTypeEnum.DISCONNECT.getCode() == packetType) {
            return new DisconnectPacketProcessor(ctx, new DisconnectPacketParser(fixedHeader));
        } else {
            throw new AppException(ErrorCode.UNDEFINED_PACKET_TYPE, Integer.toString(packetType));
        }

    }
}
